{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 07: RDD Joins\n",
    "\n",
    "> **Note:** This example is obsolete. Use [Dataset](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset) (SQL) joins instead. They are far more flexible and performant!\n",
    "\n",
    "Joins are a familiar concept in databases and Spark supports them, too. Joins at very large scale can be quite expensive, although a number of optimizations have been developed, some of which require programmer intervention to use. We won't discuss the details here, but it's worth reading how joins are implemented in various *Big Data* systems, such as [this discussion for Hive joins](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins#LanguageManualJoins-JoinOptimization) and the **Joins** section of [Hadoop: The Definitive Guide](http://shop.oreilly.com/product/0636920021773.do).\n",
    "\n",
    "Here, we will join the KJV Bible data with a small \"table\" that maps the book abbreviations to the full names, e.g., `Gen` to `Genesis`.\n",
    "\n",
    "See the corresponding Spark job [Joins7.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/Joins7.scala)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "in = ../data/kjvdat.txt\n",
       "abbrevToNames = ../data/abbrevs-to-names.tsv\n",
       "out = output/kjv-joins\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "output/kjv-joins"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val in = \"../data/kjvdat.txt\"                       // '|' separated\n",
    "val abbrevToNames = \"../data/abbrevs-to-names.tsv\"  // tab separated\n",
    "val out = \"output/kjv-joins\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This time, we won't use the `toText` method we defined before, we'll just split on `|` as before, but return a new record, `(book, (chapter#, verse#, verse))`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "input = MapPartitionsRDD[2] at map at <console>:30\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[2] at map at <console>:30"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val input = sc.textFile(in)\n",
    "  .map { line =>\n",
    "    val ary = line.split(\"\\\\s*\\\\|\\\\s*\")\n",
    "    (ary(0), (ary(1), ary(2), ary(3)))\n",
    "  }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now load the abbreviations, splitting each line on whitespace (for the tab), but only on the first whitespace, since some book names have more than one word."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "abbrevs = MapPartitionsRDD[5] at map at <console>:30\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[5] at map at <console>:30"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val abbrevs = sc.textFile(abbrevToNames)\n",
    "  .map { line =>\n",
    "    val ary = line.split(\"\\\\s+\", 2)\n",
    "    (ary(0), ary(1).trim)  // I've noticed trailing whitespace... so trim it\n",
    "  }"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, for both datasets, the key is the first element, the Bible book abbreviation, so the RDD join is simple."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "verses = MapPartitionsRDD[8] at join at <console>:34\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[8] at join at <console>:34"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val verses = input.join(abbrevs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Did we match every line?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "All records were matched!\n"
     ]
    }
   ],
   "source": [
    "if (input.count != verses.count) {\n",
    "  println(s\"input count, ${input.count}, doesn't match output count, ${verses.count}\")\n",
    "} else {\n",
    "  println(\"All records were matched!\")\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's look at a few records to see what the join produced."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(Ti1,((1,1,Paul, an apostle of Jesus Christ by the commandment of God our Saviour, and Lord Jesus Christ, which is our hope;~),1 Timothy))\n",
      "(Ti1,((1,2,Unto Timothy, my own son in the faith: Grace, mercy, and peace, from God our Father and Jesus Christ our Lord.~),1 Timothy))\n",
      "(Ti1,((1,3,As I besought thee to abide still at Ephesus, when I went into Macedonia, that thou mightest charge some that they teach no other doctrine,~),1 Timothy))\n",
      "(Ti1,((1,4,Neither give heed to fables and endless genealogies, which minister questions, rather than godly edifying which is in faith: so do.~),1 Timothy))\n",
      "(Ti1,((1,5,Now the end of the commandment is charity out of a pure heart, and of a good conscience, and of faith unfeigned:~),1 Timothy))\n"
     ]
    }
   ],
   "source": [
    "verses.take(5).foreach(println)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "That's `(abbrev, ((chapter#, verse#, verse), full_name))`. Let's project out new records like the original, with the full name replacing the abbreviation. Note the use of \"deep\" pattern matching."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "verses2 = MapPartitionsRDD[9] at map at <console>:36\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "MapPartitionsRDD[9] at map at <console>:36"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val verses2 = verses.map {\n",
    "  // Drop the key - the abbreviated book name\n",
    "  case (_, ((chapter, verse, text), fullBookName)) => (fullBookName, chapter, verse, text)\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(1 Timothy,1,1,Paul, an apostle of Jesus Christ by the commandment of God our Saviour, and Lord Jesus Christ, which is our hope;~)\n",
      "(1 Timothy,1,2,Unto Timothy, my own son in the faith: Grace, mercy, and peace, from God our Father and Jesus Christ our Lord.~)\n",
      "(1 Timothy,1,3,As I besought thee to abide still at Ephesus, when I went into Macedonia, that thou mightest charge some that they teach no other doctrine,~)\n",
      "(1 Timothy,1,4,Neither give heed to fables and endless genealogies, which minister questions, rather than godly edifying which is in faith: so do.~)\n",
      "(1 Timothy,1,5,Now the end of the commandment is charity out of a pure heart, and of a good conscience, and of faith unfeigned:~)\n"
     ]
    }
   ],
   "source": [
    "verses2.take(5).foreach(println)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Save the output."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "lastException: Throwable = null\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "verses2.saveAsTextFile(out)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Are the books sorted properly? If not, any idea why not??"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Recap\n",
    "\n",
    "NGrams are not only useful, they can be fun to play with..."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exercises"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 1: Try different sacred texts\n",
    "\n",
    "Does the Aprocrypha `apodat.txt` also work with the abbreviations file?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 2: Try Outer Joins\n",
    "\n",
    "See [PairRDDFunctions](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), which is what we actually used when calling `join`. (An implicit conversion is used...)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise 3: Preserve the original order\n",
    "\n",
    "The output does _not_ preserve the original order of the verses! This is a consequence of how joins are implemented (using \"co-groups\").\n",
    "\n",
    "Fix the ordering. Here is one approach; compute (either here or in advance and stored in a file) a map from book names (or abbreviations) to an index, e.g., `Gen -> 1`, `Exo -> 2`, etc. Use this to construct a sort key containing the book index, chapter number, and verse number. Note that the chapter number and verse number will be strings when extracted from the file, so you must convert them to integers using `x.toInt`. Finally, project out the full book name, chapter, verse, and text."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
